package com.zhang.edu.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import com.zhang.edu.realtime.utils.DimUtil;
import com.zhang.edu.realtime.utils.PhoenixUtil;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class DimPhoenixSinkFunc implements SinkFunction<JSONObject> {
    @Override
    public void invoke(JSONObject jsonObj, Context context) {
        // 获取目标表名
        String sinkTable = jsonObj.getString("sink_table");
        // 获取操作类型
        String type = jsonObj.getString("type");
        // 获取 id 字段的值
        String id = jsonObj.getString("id");

        // 清除 JSON 对象中的 sinkTable 字段和 type 字段
        // 以便可将该对象直接用于 HBase 表的数据写入
        jsonObj.remove("sink_table");
        jsonObj.remove("type");

        // 执行写入操作
        PhoenixUtil.executeDML(sinkTable, jsonObj);

        // 如果操作类型为 update，则清除 redis 中的缓存信息
        if ("update".equals(type)) {
            DimUtil.deleteCached(sinkTable, id);
        }
    }
}
